package com.cloudansys.core.flink.function;

import cn.hutool.core.date.DateUtil;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.SneakyThrows;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.text.SimpleDateFormat;
import java.util.List;

public class OriginFrequencyWatermark implements AssignerWithPeriodicWatermarks<List<MultiDataEntity>> {
    private long currentMaxTimestamp = 0L;

    /**
     * 定义生成 Watermark 的逻辑
     * 默认 100ms 被调用一次
     */
    @Override
    public Watermark getCurrentWatermark() {
        // 最大允许的乱序时间是5s
        final long maxOutOfOrder = 5 * 1000;
        return new Watermark(currentMaxTimestamp - maxOutOfOrder);
//        return new Watermark(System.currentTimeMillis() - maxOutOfOrderness);
    }

    /**
     * 抽取 eventTime
     *
     * @param element                  流元素
     * @param previousElementTimestamp 上一个流元素的 eventTime
     * @return 当前流元素的 eventTime
     */
    @SneakyThrows
    @Override
    public long extractTimestamp(List<MultiDataEntity> element, long previousElementTimestamp) {
        // 因为同一个 List<MultiDataEntity> 中的 pickTime 是相同的
        String pickTime = element.get(0).getPickTime();
        // 获取当前记录的时间戳
        long eventTime = DateUtil.parse(pickTime, Const.FMT_TRIM_MILLI).getTime();
        // 更新最大的时间戳
        currentMaxTimestamp = Math.max(eventTime, currentMaxTimestamp);
        // 返回记录的时间戳
        return eventTime;
//        return System.currentTimeMillis();
    }
}
